{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Intro"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Multi Devices, Single Machine\n",
    "* Check if GPU cards have nVidia Compute Capability >3.0\n",
    "* Alternative using AWS: [helpful blog post](http://goo.gl/kbge5b)\n",
    "* Google Cloud service: [uses TPU hardware](https://cloud.google.com/ml)\n",
    "* [Which to use? (Tim Dettmers)](https://goo.gl/pCtSAn)\n",
    "* Download CUDA & CuDNN, set their environment vars\n",
    "* use *nvidie-smi* cmnd to check installation\n",
    "* install TF with GPU support\n",
    "* open Python shell, verify TF detects CUDA & cuDNN\n",
    "\n",
    "    >import tensorflow as tf\n",
    "    \n",
    "    >sess = tf.Session()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "import tensorflow as tf\n",
    "\n",
    "config = tf.ConfigProto()\n",
    "#config.gpu_options.per_process_gpu_memory_fraction=0.4\n",
    "config"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Managing GPU RAM\n",
    "* TF grabs all GPU RAM on first graph invocation. To run 2nd TF program while the 1st is still running, run each process on different GPU cards. (Below: program #1 sees GPUs 0,1; program #2 sees GPUs 2,3.)\n",
    "\n",
    "> $ CUDA_VISIBLE_DEVICES=0,1 python3 program_1.py\n",
    "\n",
    "> $ CUDA_VISIBLE_DEVICES=3,2 python3 program_2.py\n",
    "\n",
    "* Option 2: tell TF to grab a % of memory. (Below: 40% allocation.)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "session = tf.Session(config=config)\n",
    "config,session"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Placing Ops on Devices"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Parallel Execution\n",
    "* [TF Whitepaper](http://goo.gl/vSjA14) - dynamic algorithm, distributes ops across all available devices. **But not available (yet) in open-source TF.**"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Simple Placement\n",
    "* Mostly up to you. To pin devices to specific device, use a device() function. Below: a,b pinned to cpu#0; c can go anywhere."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "import tensorflow as tf\n",
    "\n",
    "with tf.device(\"/cpu:0\"):\n",
    "    a,b = tf.Variable(3.0), tf.Variable(4.0)\n",
    "c = a*b\n",
    "c"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Logging Placements\n",
    "* Use *log_device_placement=True*. This tells placer to log msg whenever a node is \"placed\"."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "import tensorflow as tf\n",
    "\n",
    "config = tf.ConfigProto()\n",
    "config.log_device_placement = True\n",
    "sess = tf.Session(config=config)\n",
    "print(config,\"\\n\",sess)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Dynamic Placement\n",
    "* You can specify a function instead of a device when creating a device block."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "import tensorflow as tf\n",
    "\n",
    "def variables_on_cpu(op):\n",
    "    if op.type == \"Variable\":\n",
    "        return \"/cpu:0\"\n",
    "    else:\n",
    "        return \"/cpu:0\"\n",
    "with tf.device(variables_on_cpu):\n",
    "    a = tf.Variable(3.0)\n",
    "    b = tf.constant(4.0)\n",
    "    c = a * b\n",
    "c"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Ops & Kernels\n",
    "* TF operations need to define a **kernel** to run n a device. **Not all ops have kernels for both GPUs and CPUs**. Example: TF doesn't have integer kernel for GPUs. Changin i (below) from 3 to 3.0 should allow op to run."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "import tensorflow as tf\n",
    "with tf.device(\"/gpu:0\"):\n",
    "    i = tf.Variable(3)\n",
    "test = sess.run(i.initializer)\n",
    "test"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "* To allow TF to \"fall back\" to a CPU instead, use *allow_soft_placement=True*."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "with tf.device(\"/gpu:0\"):\n",
    "    i = tf.Variable(3)\n",
    "config = tf.ConfigProto()\n",
    "config.allow_soft_placement = True\n",
    "sess = tf.Session(config=config)\n",
    "test = sess.run(i.initializer) # the placer runs and falls back to /cpu:0\n",
    "print(test)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Parallel Execution\n",
    "* TF executes any nodes with zero dependencies first. If those nodes are on **separate** devices, they are run in parallel. If on the same device, they are run in different threads & **may** be run in parallel."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Control Dependencies\n",
    "* Use *control dependencies* to control/postpone node evaluations (ex: premature memory hogging)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "import tensorflow as tf\n",
    "a = tf.constant(1.0)\n",
    "b = a + 2.0\n",
    "\n",
    "with tf.control_dependencies([a,b]):\n",
    "    x = tf.constant(3.0)\n",
    "    y = tf.constant(4.0)\n",
    "    \n",
    "print(x+y)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Multiple Devices - Multiple Servers\n",
    "* cluster: >=1 TF servers (\"tasks\") across machines. Tasks belong to **jobs** (collections of related tasks)\n",
    "* \"ps\" = parameter server\n",
    "* \"worker\" = computing engine"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "cluster_spec = tf.train.ClusterSpec({\n",
    "\"ps\": [\n",
    "\"machine-a.example.com:2221\", # /job:ps/task:0\n",
    "],\n",
    "\"worker\": [\n",
    "\"machine-a.example.com:2222\", # /job:worker/task:0\n",
    "\"machine-b.example.com:2222\", # /job:worker/task:1\n",
    "]})\n",
    "\n",
    "cluster_spec\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "server.join()\n",
    "# blocks main thread until server stops (i.e., never)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Opening a Session"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# NOT YET WORKING\n",
    "# open session\n",
    "#a = tf.constant(1.0)\n",
    "#b = a + 2\n",
    "#c = a * 3\n",
    "#with tf.Session(\"grpc://machine-b.example.com:2222\") as sess:\n",
    "#    print(c.eval()) # 9.0"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Master & Worker Services\n",
    "* gRPC protocol to talk to servers. HTTP2 basis, bidirectional\n",
    "* based on *protocol buffers*\n",
    "* all servers can provide *master* & *worker* services."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Pinning Ops Across Tasks\n",
    "* you can pin ops to any device\n",
    "* ex: "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# NOT WORKING YET\n",
    "#with tf.device(\"/job:ps/task:0/cpu:0\")\n",
    "#a = tf.constant(1.0)\n",
    "#with tf.device(\"/job:worker/task:0/cpu:0\")\n",
    "#with tf.device(\"/job:worker/task:0/gpu:1\")\n",
    "#b = a + 2\n",
    "#c = a + b"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Sharding Variables across Multiple Param Servers\n",
    "* sharding across servers mitigates risk of network card saturation\n",
    "* TF distribs variables across all \"ps\" tasks - round robin setup"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'NOT WORKING YET\\nimport tensorflow as tf\\nwith tf.device(tf.train.replica_device_setter(ps_tasks=2):\\n    v1 = tf.Variable(1.0) # pinned to /job:ps/task:0\\n    v2 = tf.Variable(2.0) # pinned to /job:ps/task:1\\n    v3 = tf.Variable(3.0) # pinned to /job:ps/task:0\\n    v4 = tf.Variable(4.0) # pinned to /job:ps/task:1\\n    v5 = tf.Variable(5.0) # pinned to /job:ps/task:0\\n'"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "'''NOT WORKING YET\n",
    "import tensorflow as tf\n",
    "with tf.device(tf.train.replica_device_setter(ps_tasks=2):\n",
    "    v1 = tf.Variable(1.0) # pinned to /job:ps/task:0\n",
    "    v2 = tf.Variable(2.0) # pinned to /job:ps/task:1\n",
    "    v3 = tf.Variable(3.0) # pinned to /job:ps/task:0\n",
    "    v4 = tf.Variable(4.0) # pinned to /job:ps/task:1\n",
    "    v5 = tf.Variable(5.0) # pinned to /job:ps/task:0\n",
    "'''"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Sharing State across Sessions (Resource Containers)\n",
    "* local session: all vars managed by session itself & vanish on end.\n",
    "* distributed session: vars managed by *resource containers* on cluster\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'# simple_client.py\\n#import tensorflow as tf\\n#import sys\\n#x = tf.Variable(0.0, name=\"x\")\\n#increment_x = tf.assign(x, x + 1)\\n#with tf.Session(sys.argv[1]) as sess:\\n#    if sys.argv[2:]==[\"init\"]:\\n#sess.run(x.initializer)\\n#sess.run(increment_x)\\n#print(x.eval())\\n'"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "'''# simple_client.py\n",
    "#import tensorflow as tf\n",
    "#import sys\n",
    "#x = tf.Variable(0.0, name=\"x\")\n",
    "#increment_x = tf.assign(x, x + 1)\n",
    "#with tf.Session(sys.argv[1]) as sess:\n",
    "#    if sys.argv[2:]==[\"init\"]:\n",
    "#sess.run(x.initializer)\n",
    "#sess.run(increment_x)\n",
    "#print(x.eval())\n",
    "'''"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# launches client which connects to B, reuses variable x\n",
    "# python3 simple_client.py grpc://machine-b.example.com:2222\n",
    "#2.0"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Async Communications (TF Queues)\n",
    "\n",
    "* Queueing data\n",
    "* DeQueueing data\n",
    "* Queues of tuples\n",
    "* Closing a queue\n",
    "* RandomShuffleQueue\n",
    "* PaddingFifoQueue\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Loading Data Directly from Graph\n",
    "\n",
    "* Needed to avoid file server (bandwidth) saturation\n",
    "* Preloading data to variables\n",
    "* Reading data from graph with **reader operations**\n",
    "\n",
    "    * CSV, binary, TFRecords\n",
    "    * **TextLineReader** reads file lines one-by-one\n",
    "    * record identifier (string): filename:linenumber\n",
    "    * tf.decode_csv(val, record_defaults=[...])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'TO LOAD A GRAPH\\ninstance_queue = tf.RandomShuffleQueue(\\n    capacity=10, \\n    min_after_dequeue=2,\\n    dtypes=[tf.float32, tf.int32], \\n    shapes=[[2],[]],\\n    name=\"instance_q\", \\n    shared_name=\"shared_instance_q\")\\n\\nenqueue_instance = instance_queue.enqueue([features, target])\\nclose_instance_queue = instance_queue.close()\\n'"
      ]
     },
     "execution_count": 9,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "'''TO LOAD A GRAPH\n",
    "instance_queue = tf.RandomShuffleQueue(\n",
    "    capacity=10, \n",
    "    min_after_dequeue=2,\n",
    "    dtypes=[tf.float32, tf.int32], \n",
    "    shapes=[[2],[]],\n",
    "    name=\"instance_q\", \n",
    "    shared_name=\"shared_instance_q\")\n",
    "\n",
    "enqueue_instance = instance_queue.enqueue([features, target])\n",
    "close_instance_queue = instance_queue.close()\n",
    "'''"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'TO RUN THE GRAPH\\nwith tf.Session([...]) as sess:\\n    sess.run(enqueue_filename, feed_dict={filename: \"my_test.csv\"})\\n    sess.run(close_filename_queue)\\n    try:\\n        while True:\\n            sess.run(enqueue_instance)\\n    except tf.errors.OutOfRangeError as ex:\\n        pass # no more records in the current file and no more files to read\\n    sess.run(close_instance_queue)\\n'"
      ]
     },
     "execution_count": 10,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "'''TO RUN THE GRAPH\n",
    "with tf.Session([...]) as sess:\n",
    "    sess.run(enqueue_filename, feed_dict={filename: \"my_test.csv\"})\n",
    "    sess.run(close_filename_queue)\n",
    "    try:\n",
    "        while True:\n",
    "            sess.run(enqueue_instance)\n",
    "    except tf.errors.OutOfRangeError as ex:\n",
    "        pass # no more records in the current file and no more files to read\n",
    "    sess.run(close_instance_queue)\n",
    "'''\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Multithreaded readers using a Coordinator & QueueRunner"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Other convenience functions\n",
    "* *string_input_producer()*\n",
    "* *tf.train.start_queue_runners()*\n",
    "##### producer functions = create queues\n",
    "* input_producer()\n",
    "* range_input_producer()\n",
    "* slice_input_procucer()\n",
    "* shuffle_batch(list_of_tensors)\n",
    "    * returns RandomShuffleQueue\n",
    "    * returns QueueRunner (added to GraphKeys.QUEUE_RUNNERS)\n",
    "    * dequeue_many() = returns minibatch from queue\n",
    "    \n",
    "    * batch() --?\n",
    "    * batch_join() --?\n",
    "    * shuffle_batch_join() --?"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### One NN per Device\n",
    "\n",
    "* near-linear speedup: training 100 nets across 50 servers x 2 gpus/server roughly equiv to 1 net on 1 gpu. (**perfect for hyperparamer tuning**)\n",
    "\n",
    "* potential option: [tf serving, released 2/2016](https://tensorflow.github.io/serving/)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### In-Graph vs Between-Graph Replication (for Ensembles)\n",
    "* Two approaches to building ensembles:\n",
    "\n",
    "1) one big graph, one session, any server in cluster (\"in graph replication\")\n",
    "\n",
    "2) one graph/network, handle synchronization yourself (\"between graph replication\") using queues -- considered more flexible\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'NOT YET\\nwith tf.Session([...]) as sess:\\n    [...]\\n    run_options = tf.RunOptions()\\n    run_options.timeout_in_ms = 1000 # 1s timeout\\n    try:\\n        pred = sess.run(dequeue_prediction, options=run_options)\\n    except tf.errors.DeadlineExceededError as ex:\\n        [...] # the dequeue operation timed out after 1s\\n'"
      ]
     },
     "execution_count": 11,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "#RunOptions ... timeout_in_ms()\n",
    "'''NOT YET\n",
    "with tf.Session([...]) as sess:\n",
    "    [...]\n",
    "    run_options = tf.RunOptions()\n",
    "    run_options.timeout_in_ms = 1000 # 1s timeout\n",
    "    try:\n",
    "        pred = sess.run(dequeue_prediction, options=run_options)\n",
    "    except tf.errors.DeadlineExceededError as ex:\n",
    "        [...] # the dequeue operation timed out after 1s\n",
    "'''"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'NOT YET\\nconfig = tf.ConfigProto()\\nconfig.operation_timeout_in_ms = 1000\\n# 1s timeout for every operation\\nwith tf.Session([...], config=config) as sess:\\n    [...]\\n    try:\\n        pred = sess.run(dequeue_prediction)\\n    except tf.errors.DeadlineExceededError as ex:\\n        [...] # the dequeue operation timed out after 1s\\n'"
      ]
     },
     "execution_count": 12,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "#\n",
    "'''NOT YET\n",
    "config = tf.ConfigProto()\n",
    "config.operation_timeout_in_ms = 1000\n",
    "# 1s timeout for every operation\n",
    "with tf.Session([...], config=config) as sess:\n",
    "    [...]\n",
    "    try:\n",
    "        pred = sess.run(dequeue_prediction)\n",
    "    except tf.errors.DeadlineExceededError as ex:\n",
    "        [...] # the dequeue operation timed out after 1s\n",
    "'''"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Model Parallelism\n",
    "* Chopping models, running chunks on different devices\n",
    "* Fully Connect Nets (FCNs): not much value in doing this\n",
    "* Vertical & Horiz slicing don't work well either\n",
    "\n",
    "* Nets w/ partially connected layers (CNNs): easier to distribute\n",
    "* Some RNNs use mem cells (input from own output at t+1)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Data Parallelism\n",
    "* **Sync updates** (aggregator waits for all gradients to be available, finds avg, applies result) - could be delayed by slow devices; params could also saturate server bandwidth\n",
    "* **Async updates** - more training steps/minute. issue: \"stale gradients\" (when computing gradients falls behind rate of parameter change) - slows convergence, introduces noise/wobble. To avoid this:\n",
    "    * reduce learning rate\n",
    "    * drop/scaleback stale gradients\n",
    "    * adjust minibatch size\n",
    "    * Start first few epochs with just one replica (\"warmup phase\")\n",
    "* **Bandwidth** - At some point, more GPUs doesn't help because network saturation won't allow more data traffic. [google report](http://goo.gl/E4ypxo). Steps you can take:\n",
    "    * group gpus on single server (avoids network hops)\n",
    "    * shard params acrosss servers\n",
    "    * drop precision from float32 to bfloat16\n",
    "    * 8b precision (\"quantization\"): see [mobile phone apps](http://goo.gl/09Cb6v)\n",
    "* **How TF does it** - \n",
    "    * you choose 1) replication type (in-graph, between-graph) and 2) update type (async or sync)\n",
    "        1) in-graph + sync: one big graph\n",
    "        2) in-graph + async: 1 optimizer/replica, 1 thread/replica\n",
    "        3) bw-graph + sync: wrap optimizer in **SyncReplicasOptimizer**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python [Root]",
   "language": "python",
   "name": "Python [Root]"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
